refactor: harden FileCleanupStrategy with retry and parallel deletes#649
refactor: harden FileCleanupStrategy with retry and parallel deletes#649shangxinli wants to merge 2 commits into
Conversation
| if (begin >= items.size()) break; | ||
| std::size_t end = std::min(begin + per, items.size()); | ||
| auto slice = items.subspan(begin, end - begin); | ||
| futures.emplace_back(std::async(std::launch::async, [slice, &work]() { |
There was a problem hiding this comment.
I don't think it is a good idea to use std::async here. Instead, we may want to introduce a thread pool to fully control the resource.
There was a problem hiding this comment.
Good call -- replaced std::async with a dedicated ThreadPool in ff3a254. The pool is owned by FileCleanupStrategy, sized once at construction (min(8, hardware_concurrency)), and drained on destruction, so CleanFiles no longer spins up fresh threads on each DeleteFiles call. Added util/thread_pool_internal.{h,cc} plus ThreadPoolTest coverage (submit, fan-out, exception isolation, drain). PTAL.
…el deletes Addresses review feedback on apache#649: std::async spins up a fresh std::thread per DeleteFiles call (and per worker), which thrashes thread creation when CleanFiles invokes DeleteFiles 3-4 times in a row. Replace with a per-strategy ThreadPool that owns its workers for the lifetime of the strategy. - New util/thread_pool_internal.h / .cc: minimal worker pool with eager thread start, mutex+cv task queue, Submit returning a future<void>, and a RunAndWait fan-out helper for span-of-items workloads. Drained on destruction. - FileCleanupStrategy now holds a ThreadPool sized once at construction (min(8, hardware_concurrency)). DeleteFiles short-circuits empty/single-item batches and otherwise delegates to pool_.RunAndWait. The pool member is declared last so workers are joined before file_io_ and delete_func_ are destroyed. - Drops the RunInParallel free template, the per-call WorkerCount, and the <future>/<span> includes in expire_snapshots.cc. - Adds util_test::ThreadPoolTest covering ctor validation, single submit, fan-out, empty no-op, observed concurrency, exception isolation, and dtor drain. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
6cf99b7 to
ff3a254
Compare
| /// the constructor and joined in the destructor. | ||
| /// | ||
| /// All public methods are thread-safe. | ||
| class ICEBERG_EXPORT ThreadPool { |
There was a problem hiding this comment.
@HuaHuaY is already working on a design of thread pool that with pluggability. I'd prefer to pause this PR until we finalize the design.
There was a problem hiding this comment.
I have submitted a new PR #687, which allows users to use their own thread pools through our specified interface.
There was a problem hiding this comment.
Thanks to @HuaHuaY, #687 is merged. Do you want to rebase on it, @shangxinli?
There was a problem hiding this comment.
Rebased onto #687 — force-pushed as 7a329ad. Single commit on top of current main.
Changes:
FileCleanupStrategynow takes anOptionalExecutor. When a customDeleteWith()callback is set, per-path deletes fan out throughTaskGroup<>(uses the executor if provided, otherwise runs serially viaRunTasksSingleThreaded).- The
FileIO::DeleteFilesbulk path is wrapped inRetryRunner<retry::StopRetryOn<ErrorKind::kNotFound>>with a tight budget (2 retries, 5s total). Mirrors Java'sTasks.foreach(...).stopRetryOn(NotFoundException.class).retry(N). - Added
ExpireSnapshots::Executor(OptionalExecutor)so callers can opt in to parallel deletion; threaded through bothIncrementalFileCleanupandReachableFileCleanup. - Dropped the
std::async/std::thread/std::spanmachinery and the ad-hoc retry loop in favor ofutil/task_group.handutil/retry_util.hfrom feat: add executor pool support #687. - New
ExecutorDispatchesDeletesConcurrentlytest wires atest::ThreadExecutorthrough the new API and asserts one submission per file.
There was a problem hiding this comment.
Heads up @wgtmac — quick self-review pass after the rebase, force-pushed as 70d507d (amends 7a329ad):
- Renamed the new setter
Executor()→ExecuteDeleteWith()to match Java'sexecuteDeleteWith(ExecutorService)and remove the name shadow overiceberg::Executor. - Sharpened the comment above the bulk-path
RetryRunnerto be honest about its limits: it helps atomic-bulk FileIO impls (e.g. S3DeleteObjects); for the default fail-fast iterative impl a retried attempt re-submits the full path list and may short-circuit early onkNotFound. Still best-effort, no worse than the prior un-retried call. - Refreshed the PR description body — it had been describing the old
std::async-based approach.
No behavioural changes vs 7a329ad; same 26 tests passing locally.
6f8ae36 to
7a329ad
Compare
Rebases onto apache#687's executor pool support per @wgtmac's review: * FileCleanupStrategy now takes an OptionalExecutor in its constructor. When the custom DeleteWith() callback is configured, per-path deletes fan out through a TaskGroup that uses the executor (or runs serially when none is set, preserving prior behavior). * The FileIO bulk delete path wraps file_io_->DeleteFiles in a tight RetryRunner<retry::StopRetryOn<kNotFound>> so transient FileIO errors no longer give up after the first attempt. Mirrors Java's Tasks.foreach(...).stopRetryOn(NotFoundException.class).retry(N). * Adds ExpireSnapshots::Executor(OptionalExecutor) so callers can opt in to parallel deletion, and threads it down through both IncrementalFileCleanup and ReachableFileCleanup. * Drops the std::async / std::thread / std::span machinery and the ad-hoc retry loop -- replaced by util/task_group.h and util/retry_util.h from apache#687. Adds ExecutorDispatchesDeletesConcurrently test that wires a test::ThreadExecutor through the new API and asserts the executor received one submission per file.
7a329ad to
70d507d
Compare
| // Custom callback path: invoke one path at a time, optionally on a worker thread | ||
| // pulled from the configured executor. Without an executor TaskGroup runs the | ||
| // callbacks synchronously on the calling thread. | ||
| TaskGroup<> group; |
There was a problem hiding this comment.
You can use TaskGroup<retry::StopRetryOn<ErrorKind::kNotFound>> group(kDeleteRetryConfig) to simplify code.
There was a problem hiding this comment.
My mistakes. I confused this with line 128 and didn’t notice that there is no retry here. If you need to retry in the future, you can do this.
Per HuaHuaY's review feedback on apache#649, replace the separate RetryRunner<retry::StopRetryOn<ErrorKind::kNotFound>> block with a TaskGroup<retry::StopRetryOn<ErrorKind::kNotFound>> constructed from kDeleteRetryConfig. Both the bulk FileIO::DeleteFiles call and the per-path delete_func_ callbacks now submit into the same TaskGroup, so there's only one retry/executor construct in DeleteFiles. No behaviour change: the bulk path still gets the bounded retry that short-circuits on kNotFound, and the custom-callback path is unaffected by the retry policy since its lambda swallows exceptions and always returns OK.
| std::vector<std::string> path_list(paths.begin(), paths.end()); | ||
| std::ignore = file_io_->DeleteFiles(path_list); | ||
| if (paths.empty()) return; | ||
| std::vector<std::string> path_list(paths.begin(), paths.end()); |
There was a problem hiding this comment.
We only need to create the vector when there is not delete_func_.
Rebased onto #687's executor pool support per @wgtmac's review.
What
FileCleanupStrategynow takes anOptionalExecutorin its constructor. WhenDeleteWith()is configured, per-path deletes fan out throughTaskGroup<>-- uses the supplied executor when set, otherwise runs the callbacks serially on the calling thread viaRunTasksSingleThreaded. Preserves prior single-threaded behavior by default.FileIO::DeleteFilesbulk path is wrapped inRetryRunner<retry::StopRetryOn<ErrorKind::kNotFound>>with a tight budget (2 retries, 100ms-1s backoff, 5s total). Mirrors Java'sTasks.foreach(...).stopRetryOn(NotFoundException.class).retry(N). The retry primarily helps atomic-bulk FileIO impls (e.g. an S3DeleteObjects-backed FileIO) ride out transient throttles -- best-effort, see code comment for the fail-fast-iterative caveat.ExpireSnapshots::ExecuteDeleteWith(OptionalExecutor)(named after Java'sexecuteDeleteWith(ExecutorService)) so callers opt in to parallel deletion; threaded through bothIncrementalFileCleanupandReachableFileCleanup.DeleteWith()doc note clarifies that the user-supplied callback may be invoked concurrently from worker threads and must be thread-safe.std::async/std::thread/std::spanmachinery and the ad-hoc retry loop -- replaced byutil/task_group.handutil/retry_util.hfrom feat: add executor pool support #687.Test coverage
Existing 25
ExpireSnapshots*tests continue to pass unchanged (they hit the default no-executor path).Adds
ExpireSnapshotsCleanupTest.ExecutorDispatchesDeletesConcurrently-- wires atest::ThreadExecutorthroughExecuteDeleteWith(), runs anExpireSnapshotIdcleanup with a customDeleteWithcallback, and asserts the executor received one submission per file (data + manifest + manifest-list = 3) and that all paths show up in the mutex-guarded sink.